In [1]:
# Initialization Spark in Python
from pyspark import SparkContext
sc = SparkContext("local", "Work with key-value")

KEY-VALUE


In [11]:
lines = sc.parallelize(["hello world", "hi 123", "good moning"])

In [12]:
pairs = lines.map(lambda x: (x.split(" ")[0], x))

In [17]:
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)
result.first()


Out[17]:
('hello', 'hello world')

Example


In [20]:
rdd = sc.parallelize({(1, 2), (3, 4), (3, 6)})

In [22]:
result = rdd.reduceByKey(lambda x, y: x + y)
result.first()


Out[22]:
(1, 2)

In [26]:
result = rdd.groupByKey()
result.first()


Out[26]:
(1, <pyspark.resultiterable.ResultIterable at 0x7fede1727810>)

In [27]:
result = rdd.mapValues(lambda x: x + 1)
result.first()


Out[27]:
(1, 3)

In [31]:
result = rdd.keys()
result.first()


Out[31]:
1

In [32]:
result = rdd.values()
result.first()


Out[32]:
2

In [33]:
result = rdd.sortByKey()
result.first()


Out[33]:
(1, 2)

In [34]:
rdd = sc.parallelize({(1, 2), (3, 4), (3, 6)})
other = sc.parallelize({(3, 9)})

In [37]:
result = rdd.subtractByKey(other)
result.first()


Out[37]:
(1, 2)

In [38]:
result = rdd.join(other)
result.first()


Out[38]:
(3, (4, 9))

In [39]:
result = rdd.rightOuterJoin(other)
result.first()


Out[39]:
(3, (4, 9))

In [41]:
result = rdd.leftOuterJoin(other)
result.first()


Out[41]:
(1, (2, None))

In [42]:
result = rdd.cogroup(other)
result.first()


Out[42]:
(1,
 (<pyspark.resultiterable.ResultIterable at 0x7fede16ba990>,
  <pyspark.resultiterable.ResultIterable at 0x7fede16ba290>))

Aggregation


In [46]:
result = rdd.mapValues(lambda x: (x, 1)).reduceByKey(
  lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [53]:
# counting words
text = sc.textFile("log.txt")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [67]:
# The calculation of the average value for each key
sumCount = rdd.combineByKey(
    (lambda x: (x, 1)),
    (lambda x, y: (x[1] + 1)),
    (lambda x, y: (x[0] + y[0], x[1] + y[1])))

sumCount.map(lambda key, xy: (key, xy[0]/xy[1]))
sumCount.collectAsMap()


Out[67]:
{1: (2, 1), 3: 2}

Setting concurrency level


In [69]:
data = [("a", 3), ("b", 4), ("a", 1)]

In [72]:
# Default
result = sc.parallelize(data).reduceByKey(lambda x, y: x + y)
result.first()


Out[72]:
('a', 4)

In [74]:
# Custom
result = sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10)
result.first()


Out[74]:
('b', 4)

In [77]:
rdd.getNumPartitions()


Out[77]:
1

Sort


In [79]:
result = rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))
result.first()


Out[79]:
(1, 2)

In [80]:
sc.stop()